﻿#if NET40
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// BatchedJoinBlock.cs
//
//
// A propagator block that groups individual messages of multiple types
// into tuples of arrays of those messages.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Contracts;
using System.Threading.Tasks.Dataflow.Internal;

namespace System.Threading.Tasks.Dataflow
{
  #region -- class BatchedJoinBlock<T1, T2> --

  /// <summary>Provides a dataflow block that batches a specified number of inputs of potentially differing types
  /// provided to one or more of its targets.</summary>
  /// <typeparam name="T1">Specifies the type of data accepted by the block's first target.</typeparam>
  /// <typeparam name="T2">Specifies the type of data accepted by the block's second target.</typeparam>
  [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
  [DebuggerTypeProxy(typeof(BatchedJoinBlock<,>.DebugView))]
  public sealed class BatchedJoinBlock<T1, T2> : IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>>>, IDebuggerDisplay
  {
    #region @ Fields @

    /// <summary>The size of the batches generated by this BatchedJoin.</summary>
    private readonly Int32 _batchSize;

    /// <summary>State shared among the targets.</summary>
    private readonly BatchedJoinBlockTargetSharedResources _sharedResources;

    /// <summary>The target providing inputs of type T1.</summary>
    private readonly BatchedJoinBlockTarget<T1> _target1;

    /// <summary>The target providing inputs of type T2.</summary>
    private readonly BatchedJoinBlockTarget<T2> _target2;

    /// <summary>The source side.</summary>
    private readonly SourceCore<Tuple<IList<T1>, IList<T2>>> _source;

    #endregion

    #region @ Constructors @

    /// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2}"/> with the specified configuration.</summary>
    /// <param name="batchSize">The number of items to group into a batch.</param>
    /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
    public BatchedJoinBlock(Int32 batchSize) :
      this(batchSize, GroupingDataflowBlockOptions.Default)
    { }

    /// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2}"/> with the specified configuration.</summary>
    /// <param name="batchSize">The number of items to group into a batch.</param>
    /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BatchedJoinBlock{T1,T2}"/>.</param>
    /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
    /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
    public BatchedJoinBlock(Int32 batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
    {
      // Validate arguments
      if (batchSize < 1) { throw new ArgumentOutOfRangeException(nameof(batchSize), SR.ArgumentOutOfRange_GenericPositive); }
      if (dataflowBlockOptions == null) { throw new ArgumentNullException(nameof(dataflowBlockOptions)); }
      if (!dataflowBlockOptions.Greedy) { throw new ArgumentException(SR.Argument_NonGreedyNotSupported, nameof(dataflowBlockOptions)); }
      if (dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded)
      {
        throw new ArgumentException(SR.Argument_BoundedCapacityNotSupported, nameof(dataflowBlockOptions));
      }
      Contract.EndContractBlock();

      // Store arguments
      _batchSize = batchSize;
      dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();

      // Configure the source
      _source = new SourceCore<Tuple<IList<T1>, IList<T2>>>(
          this, dataflowBlockOptions, owningSource => ((BatchedJoinBlock<T1, T2>)owningSource).CompleteEachTarget());

      // The action to run when a batch should be created.  This is typically called
      // when we have a full batch, but it will also be called when we're done receiving
      // messages, and thus when there may be a few stragglers we need to make a batch out of.
      Action createBatchAction = () =>
      {
        if (_target1.Count > 0 || _target2.Count > 0)
        {
          _source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2.GetAndEmptyMessages()));
        }
      };

      // Configure the targets
      _sharedResources = new BatchedJoinBlockTargetSharedResources(
          batchSize, dataflowBlockOptions,
          createBatchAction,
          () =>
          {
            createBatchAction();
            _source.Complete();
          },
          _source.AddException,
          Complete);
      _target1 = new BatchedJoinBlockTarget<T1>(_sharedResources);
      _target2 = new BatchedJoinBlockTarget<T2>(_sharedResources);

      // It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
      // In those cases we need to fault the target half to drop its buffered messages and to release its
      // reservations. This should not create an infinite loop, because all our implementations are designed
      // to handle multiple completion requests and to carry over only one.
#if !NET40
      _source.Completion.ContinueWith((completed, state) =>
      {
        var thisBlock = ((BatchedJoinBlock<T1, T2>)state) as IDataflowBlock;
        Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
        thisBlock.Fault(completed.Exception);
      }, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
#else
      Action<Task> continuationAction = completed =>
      {
        var thisBlock = this as IDataflowBlock;
        Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
        thisBlock.Fault(completed.Exception);
      };
      _source.Completion.ContinueWith(continuationAction, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
#endif

      // Handle async cancellation requests by declining on the target
      Common.WireCancellationToComplete(
          dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BatchedJoinBlock<T1, T2>)state).CompleteEachTarget(), this);
#if FEATURE_TRACING
      var etwLog = DataflowEtwProvider.Log;
      if (etwLog.IsEnabled())
      {
        etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
      }
#endif
    }

    #endregion

    #region @ Properties @

    /// <summary>Gets the size of the batches generated by this <see cref="BatchedJoinBlock{T1,T2}"/>.</summary>
    public Int32 BatchSize { get { return _batchSize; } }

    /// <summary>Gets a target that may be used to offer messages of the first type.</summary>
    public ITargetBlock<T1> Target1 { get { return _target1; } }

    /// <summary>Gets a target that may be used to offer messages of the second type.</summary>
    public ITargetBlock<T2> Target2 { get { return _target2; } }

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="OutputCount"]/*' />
    public Int32 OutputCount { get { return _source.OutputCount; } }

    #endregion

    #region - IDataflowBlock Members -

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
    public Task Completion { get { return _source.Completion; } }

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
    public void Complete()
    {
      Debug.Assert(_target1 != null, "_target1 not initialized");
      Debug.Assert(_target2 != null, "_target2 not initialized");

      _target1.Complete();
      _target2.Complete();
    }

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
    void IDataflowBlock.Fault(Exception exception)
    {
      if (exception == null) { throw new ArgumentNullException(nameof(exception)); }
      Contract.EndContractBlock();

      Debug.Assert(_sharedResources != null, "_sharedResources not initialized");
      Debug.Assert(_sharedResources._incomingLock != null, "_sharedResources._incomingLock not initialized");
      Debug.Assert(_source != null, "_source not initialized");

      lock (_sharedResources._incomingLock)
      {
        if (!_sharedResources._decliningPermanently) { _source.AddException(exception); }
      }
      Complete();
    }

    #endregion

    #region - ISourceBlock Members -

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
    [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
    public IDisposable LinkTo(ITargetBlock<Tuple<IList<T1>, IList<T2>>> target, DataflowLinkOptions linkOptions)
    {
      return _source.LinkTo(target, linkOptions);
    }

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
    Tuple<IList<T1>, IList<T2>> ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ConsumeMessage(
        DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>>> target, out Boolean messageConsumed)
    {
      return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
    }

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
    Boolean ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReserveMessage(
        DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
    {
      return _source.ReserveMessage(messageHeader, target);
    }

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
    void ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReleaseReservation(
        DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
    {
      _source.ReleaseReservation(messageHeader, target);
    }

    #endregion

    #region - IReceivableSourceBlock Members -

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
    [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
    public Boolean TryReceive(Predicate<Tuple<IList<T1>, IList<T2>>> filter, out Tuple<IList<T1>, IList<T2>> item)
    {
      return _source.TryReceive(filter, out item);
    }

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
    [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
    public Boolean TryReceiveAll(out IList<Tuple<IList<T1>, IList<T2>>> items) { return _source.TryReceiveAll(out items); }

    #endregion

    #region * CompleteEachTarget *

    /// <summary>Invokes Complete on each target</summary>
    private void CompleteEachTarget()
    {
      _target1.Complete();
      _target2.Complete();
    }

    #endregion

    #region - ToString -

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
    public override String ToString()
    {
      return Common.GetNameForDebugger(this, _source.DataflowBlockOptions);
    }

    #endregion

    #region - IDebuggerDisplay Members -

    /// <summary>Gets the number of messages waiting to be processed.  This must only be used from the debugger as it avoids taking necessary locks.</summary>
    private Int32 OutputCountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }

    /// <summary>The data to display in the debugger display attribute.</summary>
    [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
    private Object DebuggerDisplayContent
    {
      get
      {
        return "{0}, BatchSize={1}, OutputCount={2}".FormatWith(
            Common.GetNameForDebugger(this, _source.DataflowBlockOptions),
            BatchSize,
            OutputCountForDebugger);
      }
    }

    /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
    Object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }

    #endregion

    #region * class DebugView *

    /// <summary>Provides a debugger type proxy for the Transform.</summary>
    private sealed class DebugView
    {
      /// <summary>The block being viewed.</summary>
      private readonly BatchedJoinBlock<T1, T2> _batchedJoinBlock;

      /// <summary>The source half of the block being viewed.</summary>
      private readonly SourceCore<Tuple<IList<T1>, IList<T2>>>.DebuggingInformation _sourceDebuggingInformation;

      /// <summary>Initializes the debug view.</summary>
      /// <param name="batchedJoinBlock">The batched join being viewed.</param>
      public DebugView(BatchedJoinBlock<T1, T2> batchedJoinBlock)
      {
        Debug.Assert(batchedJoinBlock != null, "Need a block with which to construct the debug view.");
        _batchedJoinBlock = batchedJoinBlock;
        _sourceDebuggingInformation = batchedJoinBlock._source.GetDebuggingInformation();
      }

      /// <summary>Gets the messages waiting to be received.</summary>
      public IEnumerable<Tuple<IList<T1>, IList<T2>>> OutputQueue { get { return _sourceDebuggingInformation.OutputQueue; } }

      /// <summary>Gets the number of batches created.</summary>
      public Int64 BatchesCreated { get { return _batchedJoinBlock._sharedResources._batchesCreated; } }

      /// <summary>Gets the number of items remaining to form a batch.</summary>
      public Int32 RemainingItemsForBatch { get { return _batchedJoinBlock._sharedResources._remainingItemsInBatch; } }

      /// <summary>Gets the size of the batches generated by this BatchedJoin.</summary>
      public Int32 BatchSize { get { return _batchedJoinBlock._batchSize; } }

      /// <summary>Gets the first target.</summary>
      public ITargetBlock<T1> Target1 { get { return _batchedJoinBlock._target1; } }

      /// <summary>Gets the second target.</summary>
      public ITargetBlock<T2> Target2 { get { return _batchedJoinBlock._target2; } }

      /// <summary>Gets the task being used for output processing.</summary>
      public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }

      /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
      public GroupingDataflowBlockOptions DataflowBlockOptions { get { return (GroupingDataflowBlockOptions)_sourceDebuggingInformation.DataflowBlockOptions; } }

      /// <summary>Gets whether the block is completed.</summary>
      public Boolean IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }

      /// <summary>Gets the block's Id.</summary>
      public Int32 Id { get { return Common.GetBlockId(_batchedJoinBlock); } }

      /// <summary>Gets the set of all targets linked from this block.</summary>
      public TargetRegistry<Tuple<IList<T1>, IList<T2>>> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }

      /// <summary>Gets the target that holds a reservation on the next message, if any.</summary>
      public ITargetBlock<Tuple<IList<T1>, IList<T2>>> NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
    }

    #endregion
  }

  #endregion

  #region -- class BatchedJoinBlock<T1, T2, T3> --

  /// <summary>Provides a dataflow block that batches a specified number of inputs of potentially differing types
  /// provided to one or more of its targets.</summary>
  /// <typeparam name="T1">Specifies the type of data accepted by the block's first target.</typeparam>
  /// <typeparam name="T2">Specifies the type of data accepted by the block's second target.</typeparam>
  /// <typeparam name="T3">Specifies the type of data accepted by the block's third target.</typeparam>
  [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
  [DebuggerTypeProxy(typeof(BatchedJoinBlock<,,>.DebugView))]
  [SuppressMessage("Microsoft.Design", "CA1005:AvoidExcessiveParametersOnGenericTypes")]
  public sealed class BatchedJoinBlock<T1, T2, T3> : IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>, IDebuggerDisplay
  {
    #region @ Fields @

    /// <summary>The size of the batches generated by this BatchedJoin.</summary>
    private readonly Int32 _batchSize;

    /// <summary>State shared among the targets.</summary>
    private readonly BatchedJoinBlockTargetSharedResources _sharedResources;

    /// <summary>The target providing inputs of type T1.</summary>
    private readonly BatchedJoinBlockTarget<T1> _target1;

    /// <summary>The target providing inputs of type T2.</summary>
    private readonly BatchedJoinBlockTarget<T2> _target2;

    /// <summary>The target providing inputs of type T3.</summary>
    private readonly BatchedJoinBlockTarget<T3> _target3;

    /// <summary>The source side.</summary>
    private readonly SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>> _source;

    #endregion

    #region @ Constructors @

    /// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2,T3}"/> with the specified configuration.</summary>
    /// <param name="batchSize">The number of items to group into a batch.</param>
    /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
    public BatchedJoinBlock(Int32 batchSize) :
      this(batchSize, GroupingDataflowBlockOptions.Default)
    { }

    /// <summary>Initializes this <see cref="BatchedJoinBlock{T1,T2,T3}"/> with the specified configuration.</summary>
    /// <param name="batchSize">The number of items to group into a batch.</param>
    /// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BatchedJoinBlock{T1,T2}"/>.</param>
    /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="batchSize"/> must be positive.</exception>
    /// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
    public BatchedJoinBlock(Int32 batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
    {
      // Validate arguments
      if (batchSize < 1) { throw new ArgumentOutOfRangeException(nameof(batchSize), SR.ArgumentOutOfRange_GenericPositive); }
      if (dataflowBlockOptions == null) { throw new ArgumentNullException(nameof(dataflowBlockOptions)); }
      if (!dataflowBlockOptions.Greedy ||
          dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded)
      {
        throw new ArgumentException(SR.Argument_NonGreedyNotSupported, nameof(dataflowBlockOptions));
      }
      Contract.EndContractBlock();

      // Store arguments
      _batchSize = batchSize;
      dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();

      // Configure the source
      _source = new SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>>(
          this, dataflowBlockOptions, owningSource => ((BatchedJoinBlock<T1, T2, T3>)owningSource).CompleteEachTarget());

      // The action to run when a batch should be created.  This is typically called
      // when we have a full batch, but it will also be called when we're done receiving
      // messages, and thus when there may be a few stragglers we need to make a batch out of.
      Action createBatchAction = () =>
      {
        if (_target1.Count > 0 || _target2.Count > 0 || _target3.Count > 0)
        {
          _source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2.GetAndEmptyMessages(), _target3.GetAndEmptyMessages()));
        }
      };

      // Configure the targets
      _sharedResources = new BatchedJoinBlockTargetSharedResources(
          batchSize, dataflowBlockOptions,
          createBatchAction,
          () =>
          {
            createBatchAction();
            _source.Complete();
          },
          _source.AddException,
          Complete);
      _target1 = new BatchedJoinBlockTarget<T1>(_sharedResources);
      _target2 = new BatchedJoinBlockTarget<T2>(_sharedResources);
      _target3 = new BatchedJoinBlockTarget<T3>(_sharedResources);

      // It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
      // In those cases we need to fault the target half to drop its buffered messages and to release its
      // reservations. This should not create an infinite loop, because all our implementations are designed
      // to handle multiple completion requests and to carry over only one.
#if !NET40
      _source.Completion.ContinueWith((completed, state) =>
      {
        var thisBlock = ((BatchedJoinBlock<T1, T2, T3>)state) as IDataflowBlock;
        Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
        thisBlock.Fault(completed.Exception);
      }, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
#else
      Action<Task> continuationAction = completed =>
      {
        var thisBlock = this as IDataflowBlock;
        Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
        thisBlock.Fault(completed.Exception);
      };
      _source.Completion.ContinueWith(continuationAction, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
#endif
      // Handle async cancellation requests by declining on the target
      Common.WireCancellationToComplete(
          dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BatchedJoinBlock<T1, T2, T3>)state).CompleteEachTarget(), this);
#if FEATURE_TRACING
      var etwLog = DataflowEtwProvider.Log;
      if (etwLog.IsEnabled())
      {
        etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
      }
#endif
    }

    #endregion

    #region @ Properties @

    /// <summary>Gets the size of the batches generated by this <see cref="BatchedJoinBlock{T1,T2,T3}"/>.</summary>
    public Int32 BatchSize { get { return _batchSize; } }

    /// <summary>Gets a target that may be used to offer messages of the first type.</summary>
    public ITargetBlock<T1> Target1 { get { return _target1; } }

    /// <summary>Gets a target that may be used to offer messages of the second type.</summary>
    public ITargetBlock<T2> Target2 { get { return _target2; } }

    /// <summary>Gets a target that may be used to offer messages of the third type.</summary>
    public ITargetBlock<T3> Target3 { get { return _target3; } }

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="OutputCount"]/*' />
    public Int32 OutputCount { get { return _source.OutputCount; } }

    #endregion

    #region - IDataflowBlock Members -

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
    public Task Completion { get { return _source.Completion; } }

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
    public void Complete()
    {
      Debug.Assert(_target1 != null, "_target1 not initialized");
      Debug.Assert(_target2 != null, "_target2 not initialized");
      Debug.Assert(_target3 != null, "_target3 not initialized");

      _target1.Complete();
      _target2.Complete();
      _target3.Complete();
    }

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
    void IDataflowBlock.Fault(Exception exception)
    {
      if (exception == null) { throw new ArgumentNullException(nameof(exception)); }
      Contract.EndContractBlock();

      Debug.Assert(_sharedResources != null, "_sharedResources not initialized");
      Debug.Assert(_sharedResources._incomingLock != null, "_sharedResources._incomingLock not initialized");
      Debug.Assert(_source != null, "_source not initialized");

      lock (_sharedResources._incomingLock)
      {
        if (!_sharedResources._decliningPermanently) _source.AddException(exception);
      }
      Complete();
    }

    #endregion

    #region - ISourceBlock Members -

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
    public IDisposable LinkTo(ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target, DataflowLinkOptions linkOptions)
    {
      return _source.LinkTo(target, linkOptions);
    }

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
    Tuple<IList<T1>, IList<T2>, IList<T3>> ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ConsumeMessage(
        DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target, out Boolean messageConsumed)
    {
      return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
    }

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
    Boolean ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReserveMessage(
        DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
    {
      return _source.ReserveMessage(messageHeader, target);
    }

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
    void ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReleaseReservation(
        DataflowMessageHeader messageHeader, ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
    {
      _source.ReleaseReservation(messageHeader, target);
    }

    #endregion

    #region - IReceivableSourceBlock Members -

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
    [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
    public Boolean TryReceive(Predicate<Tuple<IList<T1>, IList<T2>, IList<T3>>> filter, out Tuple<IList<T1>, IList<T2>, IList<T3>> item)
    {
      return _source.TryReceive(filter, out item);
    }

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
    [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
    public Boolean TryReceiveAll(out IList<Tuple<IList<T1>, IList<T2>, IList<T3>>> items) { return _source.TryReceiveAll(out items); }

    #endregion

    #region * CompleteEachTarget *

    /// <summary>Invokes Complete on each target</summary>
    private void CompleteEachTarget()
    {
      _target1.Complete();
      _target2.Complete();
      _target3.Complete();
    }

    #endregion

    #region - ToString -

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
    public override String ToString()
    {
      return Common.GetNameForDebugger(this, _source.DataflowBlockOptions);
    }

    #endregion

    #region - IDebuggerDisplay Members -

    /// <summary>Gets the number of messages waiting to be processed.  This must only be used from the debugger as it avoids taking necessary locks.</summary>
    private Int32 OutputCountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }

    /// <summary>The data to display in the debugger display attribute.</summary>
    [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
    private Object DebuggerDisplayContent
    {
      get
      {
        return "{0}, BatchSize={1}, OutputCount={2}".FormatWith(
            Common.GetNameForDebugger(this, _source.DataflowBlockOptions),
            BatchSize,
            OutputCountForDebugger);
      }
    }

    /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
    Object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }

    #endregion

    #region * class DebugView *

    /// <summary>Provides a debugger type proxy for the Transform.</summary>
    private sealed class DebugView
    {
      /// <summary>The block being viewed.</summary>
      private readonly BatchedJoinBlock<T1, T2, T3> _batchedJoinBlock;

      /// <summary>The source half of the block being viewed.</summary>
      private readonly SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>>.DebuggingInformation _sourceDebuggingInformation;

      /// <summary>Initializes the debug view.</summary>
      /// <param name="batchedJoinBlock">The batched join being viewed.</param>
      public DebugView(BatchedJoinBlock<T1, T2, T3> batchedJoinBlock)
      {
        Debug.Assert(batchedJoinBlock != null, "Need a block with which to construct the debug view.");
        _sourceDebuggingInformation = batchedJoinBlock._source.GetDebuggingInformation();
        _batchedJoinBlock = batchedJoinBlock;
      }

      /// <summary>Gets the messages waiting to be received.</summary>
      public IEnumerable<Tuple<IList<T1>, IList<T2>, IList<T3>>> OutputQueue { get { return _sourceDebuggingInformation.OutputQueue; } }

      /// <summary>Gets the number of batches created.</summary>
      public Int64 BatchesCreated { get { return _batchedJoinBlock._sharedResources._batchesCreated; } }

      /// <summary>Gets the number of items remaining to form a batch.</summary>
      public Int32 RemainingItemsForBatch { get { return _batchedJoinBlock._sharedResources._remainingItemsInBatch; } }

      /// <summary>Gets the size of the batches generated by this BatchedJoin.</summary>
      public Int32 BatchSize { get { return _batchedJoinBlock._batchSize; } }

      /// <summary>Gets the first target.</summary>
      public ITargetBlock<T1> Target1 { get { return _batchedJoinBlock._target1; } }

      /// <summary>Gets the second target.</summary>
      public ITargetBlock<T2> Target2 { get { return _batchedJoinBlock._target2; } }

      /// <summary>Gets the second target.</summary>
      public ITargetBlock<T3> Target3 { get { return _batchedJoinBlock._target3; } }

      /// <summary>Gets the task being used for output processing.</summary>
      public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }

      /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
      public GroupingDataflowBlockOptions DataflowBlockOptions { get { return (GroupingDataflowBlockOptions)_sourceDebuggingInformation.DataflowBlockOptions; } }

      /// <summary>Gets whether the block is completed.</summary>
      public Boolean IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }

      /// <summary>Gets the block's Id.</summary>
      public Int32 Id { get { return Common.GetBlockId(_batchedJoinBlock); } }

      /// <summary>Gets the set of all targets linked from this block.</summary>
      public TargetRegistry<Tuple<IList<T1>, IList<T2>, IList<T3>>> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }

      /// <summary>Gets the target that holds a reservation on the next message, if any.</summary>
      public ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
    }

    #endregion
  }

  #endregion
}
#endif